-
-
Notifications
You must be signed in to change notification settings - Fork 89
Add SqliteMessageQueue and related tests #526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @2chanhaeng, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the message queuing capabilities by introducing a new SQLite-based message queue implementation. Concurrently, it establishes a robust and standardized testing framework for message queues, which has been applied to existing implementations. This dual approach not only expands the available message queue options but also improves the consistency, reliability, and maintainability of message queue tests across the project, making future integrations and testing more efficient. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new SqliteMessageQueue and a standardized testing utility, testMessageQueue, to unify message queue tests across the repository. The refactoring of existing tests to use this new utility is a great step towards reducing code duplication and ensuring consistent test coverage.
My review focuses on the new implementations. I've found a couple of critical issues in the SqliteMessageQueue related to race conditions and error handling that could lead to message loss and listener crashes. I've also identified areas for improvement in the new testMessageQueue utility to make it more robust and align with the MessageQueue interface. Additionally, I've suggested a minor improvement to the SQLite test to ensure proper cleanup of temporary database files.
Codecov Report❌ Patch coverage is
... and 2 files with indirect coverage changes 🚀 New features to boost your workflow:
|
|
How about adding |
That's a good idea, but it seems to go beyond #477. How about you create a separate issue for it? |
SqliteMessageQueue.listen fedify-dev#526 (comment)
Well, the cleanup method belongs to |
2chanhaeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding
[Symbol.dispose]()for consistency with other MessageQueue implementations?That's a good idea, but it seems to go beyond #477. How about you create a separate issue for it?
Well, the cleanup method belongs to
sqlite/src/ mq.tsso i don't think that's out of scope if you implement it for only SqliteMessageQueue. It seems quite simple but caring other message queues should be handled in other PR like you said. (Neither amqp has that)
Oh, OK. I added it at 80e11c6!
SqliteMessageQueue.listen fedify-dev#526 (comment)
packages/sqlite/src/mq.ts
Outdated
| `DELETE FROM "${this.#tableName}" | ||
| WHERE id = ( | ||
| SELECT id FROM "${this.#tableName}" | ||
| WHERE scheduled <= ? | ||
| ORDER BY scheduled | ||
| LIMIT 1 | ||
| ) | ||
| RETURNING id, message`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While DELETE … RETURNING is atomic, when multiple processes execute the same query simultaneously, there's a potential race condition. Using BEGIN IMMEDIATE ensures proper locking and prevents duplicate message processing.
Also, don't forget you need to handle SQLITE_BUSY error as well!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| static readonly #tableNameRegex = /^[A-Za-z_][A-Za-z0-9_]{0,63}$/; | ||
| // In-memory event emitter for notifying listeners when messages are enqueued. | ||
| // Scoped per table name to allow multiple queues to coexist. | ||
| static readonly #notifyChannels = new Map<string, EventTarget>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The static #notifyChannels map can accumulate EventTarget instances indefinitely. Consider implementing cleanup logic when the last listener for a channel is removed.
This is a minor issue for most use cases, but worth noting for long-running applications with many different table names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at a2851bf!
packages/sqlite/src/mq.ts
Outdated
| new EnqueueEvent(delayMs), | ||
| ); | ||
| } catch (error) { | ||
| this.#db.exec("ROLLBACK"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, errors are caught and re-thrown without logging. Adding error logs would help with debugging:
| this.#db.exec("ROLLBACK"); | |
| this.#db.exec("ROLLBACK"); | |
| logger.error("Failed to enqueue messages: {error}", { error, messages }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at c1697fc!
packages/sqlite/src/mq.ts
Outdated
| this.#db | ||
| .prepare( | ||
| `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) | ||
| VALUES (?, ?, ?, ?)`, | ||
| ) | ||
| .run(id, encodedMessage, now, scheduled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with busy_timeout set, SQLITE_BUSY errors can still occur when the timeout expires. In a multi-process message queue, this is a normal condition that requires retry logic.
Consider also adding retry logic to enqueueMany() and listen() methods!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at 06301ba!
| if (this.#initialized) { | ||
| return; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without journal_mode=WAL, SQLite uses the default rollback journal mode which severely limits concurrent access. This will cause reader/writer conflicts in multi-process environments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at c128708!
- Remove `let Temporal ...` - Export `@fedify/sqlite` - Add missing dependencies
SqliteMessageQueue.listen fedify-dev#526 (comment)
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new SqliteMessageQueue and a standardized testing utility, testMessageQueue, which significantly improves the structure and maintainability of message queue tests by refactoring existing ones. The implementation of SqliteMessageQueue is well-done, showing careful consideration for SQLite's concurrency characteristics.
My review focuses on the new testing framework and the refactored tests. I've identified a critical issue in the new testMessageQueue utility that could cause tests to fail for correctly implemented message queues. Additionally, I've found that some of the refactored tests for existing message queues do not correctly test for concurrent access as intended. I've also noted a minor resource leak in the new test for SqliteMessageQueue. The provided feedback includes code suggestions to address these points.
fedify-dev#526 (review) - Fixed `testMessageQueue` to pass `{ signal }` options object instead of `AbortController` directly to `listen()` method - Fixed PostgreSQL test to share same table and channel between `mq1` and `mq2` instances by moving key generation outside factory function - Fixed Redis test to share same queue keys between `mq1` and `mq2` instances by moving key generation outside factory function - Fixed SQLite test cleanup to dispose both `mq1` and `mq2` instances, preventing resource leak, and removed redundant `mq2.drop()` call
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new SqliteMessageQueue implementation and a standardized testMessageQueue utility. The new SQLite message queue is well-designed for single-node deployments, incorporating robust features like transaction management, busy-error retries with exponential backoff, and in-memory event notifications. The testMessageQueue utility provides a comprehensive test harness, significantly reducing code duplication and ensuring consistent test coverage across all message queue implementations. Existing message queue tests have been successfully refactored to leverage this new utility, improving readability and maintainability. The documentation has been updated to reflect the new SQLite message queue, and new mise tasks have been added for per-package operations, enhancing the development workflow.
| if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) { | ||
| throw new Error( | ||
| `Invalid table name for the message queue: ${this.#tableName}`, | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regular expression #tableNameRegex allows underscores (_) in table names, but the error message only mentions "letters, digits". It would be clearer to explicitly state that underscores are also allowed.
throw new Error(
`Invalid table name for the message queue: ${this.#tableName}. Only letters, digits, and underscores are allowed.`,
);| const result = await this.#withTransactionRetries(() => { | ||
| return this.#db | ||
| .prepare( | ||
| `DELETE FROM "${this.#tableName}" | ||
| WHERE id = ( | ||
| SELECT id FROM "${this.#tableName}" | ||
| WHERE scheduled <= ? | ||
| ORDER BY scheduled | ||
| LIMIT 1 | ||
| ) | ||
| RETURNING id, message`, | ||
| ) | ||
| .get(now) as { id: string; message: string } | undefined; | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DELETE ... RETURNING statement is wrapped in #withTransactionRetries. While this ensures atomicity and retries for the delete operation itself, the poll function's while loop continues to fetch and delete messages. If handler(message) fails, the message is re-enqueued (due to throw error at line 319), but the poll loop immediately breaks (line 327). This means that if a message consistently fails, it might block the queue from processing other messages until the next poll interval or enqueue event. Consider if the poll loop should continue attempting to process other messages even after a handler failure, or if a failing message should be moved to a dead-letter queue or have its scheduled time updated to prevent immediate re-processing.
| while (signal == null || !signal.aborted) { | ||
| let timeout: ReturnType<typeof setTimeout> | undefined; | ||
| await new Promise<unknown>((resolve) => { | ||
| signal?.addEventListener("abort", resolve); | ||
| timeout = setTimeout(() => { | ||
| signal?.removeEventListener("abort", resolve); | ||
| resolve(0); | ||
| }, this.#pollIntervalMs); | ||
| timeouts.add(timeout); | ||
| }); | ||
| if (timeout != null) timeouts.delete(timeout); | ||
| await poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The setTimeout in the periodic polling loop (lines 354-359) is added to timeouts but never removed if the signal aborts before the timeout fires. This could lead to a memory leak if many listen calls are made and aborted before their respective timeouts complete. The if (timeout != null) timeouts.delete(timeout); at line 360 only handles the case where the timeout does fire.
await new Promise<unknown>((resolve) => {
const abortListener = () => {
if (timeout != null) clearTimeout(timeout);
resolve();
};
signal?.addEventListener("abort", abortListener, { once: true });
timeout = setTimeout(() => {
signal?.removeEventListener("abort", abortListener);
resolve(0);
}, this.#pollIntervalMs);
timeouts.add(timeout);
});
if (timeout != null) timeouts.delete(timeout);| this.#db.close(); | ||
| this.#unregisterInstance(); | ||
| } catch (error) { | ||
| logger.error( | ||
| "Failed to close the database connection for table {tableName}: {error}", | ||
| { tableName: this.#tableName, error }, | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The [Symbol.dispose]() method logs an error if this.#db.close() fails, but it doesn't prevent the instance from being unregistered. If close() fails, the database might still be open or in an inconsistent state, and unregistering the instance might lead to further issues or resource leaks. Consider re-throwing the error or at least making the unregistration conditional on successful closing.
| await mq1.enqueue("Hello, world!"); | ||
| await waitFor(() => messages.length > 0, 15_000); | ||
| deepStrictEqual(messages, ["Hello, world!"]); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deepStrictEqual assertion here compares messages (an array) directly to ["Hello, world!"]. Given that messages are processed by multiple listeners (mq1.listen and mq2.listen), and only one listener should receive each message, the messages array might contain the message once, but the order or which listener received it isn't guaranteed. While the current test might pass due to quick processing, it's safer to assert that the message is included in the collected messages, or that the total count is correct, rather than a strict equality of the array content and order, especially in a multi-listener scenario.
| await mq1.enqueue("Hello, world!"); | |
| await waitFor(() => messages.length > 0, 15_000); | |
| deepStrictEqual(messages, ["Hello, world!"]); | |
| await waitFor(() => messages.length > 0, 15_000); | |
| ok(messages.includes("Hello, world!"), "'Hello, world!' message should be received"); | |
| deepStrictEqual(messages.length, 1, "Only one 'Hello, world!' message should be received"); |
| deepStrictEqual(messages, ["Hello, world!", "Delayed message"]); | ||
| ok( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the previous comment, deepStrictEqual(messages, ["Hello, world!", "Delayed message"]) might be too strict. The order of messages, especially when delays are involved and multiple listeners are active, is not strictly guaranteed to be ["Hello, world!", "Delayed message"]. It's more robust to check for the presence of both messages and the total count.
await waitFor(() => messages.length > 1, 15_000);
ok(messages.includes("Hello, world!"), "'Hello, world!' message should be received");
ok(messages.includes("Delayed message"), "'Delayed message' should be received");
deepStrictEqual(messages.length, 2, "Two messages should be received");| ]; | ||
| await mq1.enqueueMany(batchMessages); | ||
| await waitFor(() => messages.length >= batchMessages.length, 15_000); | ||
| deepStrictEqual(new Set(messages), new Set(batchMessages)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion deepStrictEqual(new Set(messages), new Set(batchMessages)) correctly checks for the presence of all messages regardless of order. However, it doesn't explicitly verify that each message was received exactly once, which is a crucial aspect of message queue behavior (each message should be consumed by only one listener). While the testMessageQueue description mentions "Ensures messages are processed by only one listener", this specific assertion doesn't fully cover it. Consider adding a check for the total count of messages received to ensure no duplicates or missing messages.
deepStrictEqual(new Set(messages), new Set(batchMessages));
deepStrictEqual(messages.length, batchMessages.length, "All batch messages should be received exactly once");| () => messages.length >= delayedBatchMessages.length, | ||
| 15_000, | ||
| ); | ||
| deepStrictEqual(new Set(messages), new Set(delayedBatchMessages)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the previous comment, this assertion checks for presence but not necessarily for unique consumption. Adding a length check would strengthen the test for the "processed by only one listener" guarantee.
deepStrictEqual(new Set(messages), new Set(delayedBatchMessages));
deepStrictEqual(messages.length, delayedBatchMessages.length, "All delayed batch messages should be received exactly once");| const expectedMessages = new Set( | ||
| Array.from({ length: bulkCount }, (_, i) => `message-${i}`), | ||
| ); | ||
| deepStrictEqual(new Set(messages), expectedMessages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the bulk enqueue stress test, deepStrictEqual(new Set(messages), expectedMessages) checks for presence. To fully ensure that each of the bulkCount messages was received exactly once and no duplicates or missing messages occurred, an additional check on the length of the messages array would be beneficial.
deepStrictEqual(new Set(messages), expectedMessages);
deepStrictEqual(messages.length, bulkCount, "All bulk messages should be received exactly once");
Summary
Introduce the
SqliteMessageQueueclass along with a testing framework formessage queues. Refactor existing tests to utilize the new
testMessageQueueutility for improved test structure and readability.
Related issue
SqliteMessageQueuefor single-node deployments #477Changes
SqliteMessageQueueclass to@fedify/sqlitepackage implementingthe
MessageQueueinterface using SQLite as the backing storetestMessageQueue()utility function to@fedify/testingpackagefor standardized testing of
MessageQueueimplementationswaitFor()andgetRandomKey()helper functions to@fedify/testingtestMessageQueue:@fedify/amqp@fedify/denokv@fedify/postgres@fedify/redisSqliteMessageQueueBenefits
a reusable test harness
MessageQueueimplementationsMessageQueueimplementations in the futurewith standardized testing
Checklist
mise teston your machine?